package main

import (
	"context"
	"gitee.com/k8sio/cnkit/command"
	"gitee.com/k8sio/cnkit/logger"
	"gitee.com/k8sio/cnkit/payloadhub"
	log "github.com/sirupsen/logrus"
	"math"
	"os"
	"syscall"
	"time"
)

type exampleInstance struct {
	i      int
	w      []int
	n      int
	ctx    context.Context
	cancel context.CancelFunc
	hub    *payloadhub.PayloadHub
}

func (e *exampleInstance) Initialize() error {
	log.Info("initialize...")
	e.ctx, e.cancel = context.WithCancel(context.Background())
	e.hub = payloadhub.NewPayloadHub(payloadhub.WithSize(100000))
	return nil
}

func Percent(all, percent int) int {
	if percent < 0 {
		percent = 1
	}
	if percent > 100 {
		percent = 100
	}
	a := float64(percent) / 100
	n := math.Floor(float64(all) * a)
	if n == 0 {
		n++
	}
	return int(n)
}

func (e *exampleInstance) getReliableIndexSection() int {
	return Percent(e.n, 50)
}

func (e *exampleInstance) RunLoop() {
	log.Info("run_loop...")

	if e.n <= 0 {
		e.n = 10
	}
	e.w = make([]int, e.n)

	reliableMax := e.getReliableIndexSection()
	// 启动监听者 WithReliable true
	for i := 0; i < reliableMax; i++ {
		go func(index int) {
			w := e.hub.NewPayloadHubWatcher(payloadhub.WithReliable(true))
			log.Infof("start watcher %05d   reliable", index)
			w.Watcher(func(payload interface{}) {
				// 在这里处理新数据
				//log.Println(index, "新数据到来:", payload)
				e.w[index]++
			})
		}(i)
	}

	// 启动监听者 WithReliable false
	for i := reliableMax; i < e.n; i++ {
		go func(index int) {
			w := e.hub.NewPayloadHubWatcher()
			log.Infof("start watcher %05d unreliable", index)
			w.Watcher(func(payload interface{}) {
				// 在这里处理新数据
				//log.Println(index, "新数据到来:", payload)
				e.w[index]++
			})
		}(i)
	}

	time.Sleep(1 * time.Second)
	go func() {
		defer func() {
			log.Info("quit add base")
		}()
		for e.hub.IsRunning() {
			select {
			case <-e.ctx.Done():
				return
			default:
			}
			if e.hub.Push(e.i) {
				e.i++
				//log.Infof("add %d", i)
			}
			//runtime.Gosched()
		}
	}()

}

func (e *exampleInstance) Destroy() {
	log.Info("destroy...")
	e.cancel()
	if e.hub != nil {
		e.hub.Stop()
	}
	log.Info("hub stopped")
	log.Infof("PayloadHub ...... pushed count %d", e.i)

	reliableMax := e.getReliableIndexSection()
	for i := 0; i < reliableMax; i++ {
		log.Infof("watcher %05d   reliable count %d", i, e.w[i])
	}
	for i := reliableMax; i < e.n; i++ {
		log.Infof("watcher %05d unreliable count %d", i, e.w[i])
	}
}

func SendSignalToSelf(after time.Duration, sig syscall.Signal) error {
	pid := syscall.Getpid()
	proc, err := os.FindProcess(pid)
	if err != nil {
		return err
	}

	log.Infof("Stop after %v", after)

	go func() {
		time.Sleep(after)
		err = proc.Signal(sig)
		if err != nil {
			log.Warnf("proc.Signal error: %s", err)
			log.Infof("use manual exit...")
			example.Destroy()
			os.Exit(0)
		}
	}()

	log.Info("SendSignalToSelf setup ok")

	return nil
}

func init() {
	logger.Init(
		logger.WithReportCaller(true),
		logger.WithLevel(log.InfoLevel),
		logger.WithGoId(true),
		logger.WithGoIdWidth(4),
		//logger.WithTimeFormat("2006/01/02 15:04:05.000000Z0700"),
		//logger.WithUTC(true),
	)
}

var (
	example exampleInstance
)

func main() {
	//example.watN = 100
	SendSignalToSelf(time.Second*5, syscall.SIGINT)
	command.Run(&example)
}
